package com.atguigu.yuntai.monitor.service.impl;

import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONUtil;

import com.atguigu.yuntai.monitor.bean.PartitionDef;
import com.atguigu.yuntai.monitor.bean.PartitionResult;
import com.atguigu.yuntai.monitor.bean.RuleInfoComputeWrapper;
import com.atguigu.yuntai.monitor.entity.RuleCollection;
import com.atguigu.yuntai.monitor.entity.RuleInfo;
import com.atguigu.yuntai.monitor.entity.RuleInfoCompute;
import com.atguigu.yuntai.monitor.exception.DslException;
import com.atguigu.yuntai.monitor.exception.HiveExecuteException;
import com.atguigu.yuntai.monitor.exception.HiveSqlException;
import com.atguigu.yuntai.monitor.service.*;
import com.atguigu.yuntai.monitor.sql.SqlBuilder;
import com.atguigu.yuntai.monitor.utils.DecimalUtil;
import com.googlecode.aviator.AviatorEvaluator;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * <p>ExecuteService的实现类</p>

 **/
@Service
@Slf4j
public class ExecuteServiceImpl implements ExecuteService {

    URI uri;

    @Autowired
    RuleCollectionService ruleCollectionService;

    @Autowired
    RuleInfoService ruleInfoService;

    @Autowired
    HiveServiceImpl hiveService;

    @Autowired
    AlertService alertService;


    @Autowired
    TriggerLogService triggerLogService;

    @Autowired
    RuleInfoComputeService ruleInfoComputeService;

    String swingExpr = "(result >= lower) && (result <= upper)";


    @PostConstruct
    public void initUri() {
        URL resource = this.getClass().getClassLoader().getResource("DSL.groovy");
        assert resource != null;
        uri = URI.create(resource.toString());
    }

    /**
     * 执行DSL(分区表达式)
     * @param dsl
     * @return
     */
    @SneakyThrows
    @Override
    public String runDsl(String dsl) {
        CompilerConfiguration compilerConfiguration = new CompilerConfiguration();
        compilerConfiguration.setSourceEncoding("UTF-8");
        Binding binding = new Binding();
        binding.setProperty("expr", dsl);
        GroovyShell groovyShell = new GroovyShell(binding, compilerConfiguration);
        String result;
        try {
            result = (String)groovyShell.evaluate(uri);
        }catch (Exception e){
            throw new DslException(e.getMessage(),e);
        }
        return result;
    }

    /**
     * 将分区表达式转变为分区结果
     * @param partitionDef
     * @return
     */
    @Override
    public PartitionResult partitionDef2Result(PartitionDef partitionDef) {
        String dsl = partitionDef.getDsl();
        String value = runDsl(dsl);
        return new PartitionResult()
                .setName(partitionDef.getName())
                .setType(partitionDef.getType())
                .setValue(value)
                ;
    }

    /**
     * 按照规则集合的id去触发
     * @param collectionId
     * @return
     */
    @Override
    @SneakyThrows
    public void execute(Long triggerId, Long collectionId) {
        // 0. 得到规则的集合
        String ifSuccess = "failed";
        RuleCollection ruleCollection = ruleCollectionService.getById(collectionId);
        List<RuleInfo> ruleInfoList = ruleInfoService.listAllRulesByCollectionId(collectionId);
        String database = ruleCollection.getHiveDatabase();
        String tableName = ruleCollection.getHiveTableName();
        List<RuleInfoCompute> ruleInfoComputeList = null;
        try{
            // 1. 将ruleInfo -> 转为 RuleInfoCompute
            ruleInfoComputeList = ruleInfo2Compute(triggerId, ruleInfoList);
            // 2. 按照dsl最终解析结果进行分组
            Map<PartitionResult, List<RuleInfoComputeWrapper>> map1 = groupByWhere(ruleInfoComputeList);
            // 3. 将key换成sql
            Map<String, List<RuleInfoComputeWrapper>> map2 = keyToSql(database, tableName, map1);
            System.out.println(JSONUtil.toJsonStr(map2));
            // 4. 执行最终的计算
            compute(map2, triggerId, collectionId);
            // 5. 将结果落盘
            ruleInfoComputeService.saveRuleInfoComputeList(ruleInfoComputeList);
            // 6. 对计算结果进行校验
            checkResult(triggerId,collectionId,ruleInfoComputeList);
            // 7. checkResult中更新了规则是否报警的状态,这个时候回头更新一把
            ruleInfoComputeService.updateBatchById(ruleInfoComputeList);
            // 7. 如果顺利执行到这一步，触发的执行状态应为success
            ifSuccess = "success";
        }catch (Exception e){
            // 8. 如果上述结果发生异常，则报警
            alertService.alert(triggerId,collectionId,"执行异常"+e.getMessage());
            throw new HiveExecuteException(e.getMessage(),e);
        }finally {
            // 9. 最终 触发这个规则集合的报警。
            alertService.triggerAlert(triggerId);
            // 10. 更新触发的执行状态
            triggerLogService.updateStatueById(triggerId,ifSuccess);
        }
    }
    
    public void checkResult(Long triggerLogId,Long collectionId,List<RuleInfoCompute> ruleInfoComputeList) {
        HashMap<String, Object> env = new HashMap<>();
        ruleInfoComputeList
                .forEach(ruleInfoCompute -> {
                    if(ruleInfoCompute.getFixedValue()!=null){
                        // 如果设置固定值了,算一下固定值，
                        BigDecimal checkResult = DecimalUtil.toDecimal19d6(ruleInfoCompute.getCheckResult());
                        BigDecimal fixedValue = DecimalUtil.toDecimal19d6(ruleInfoCompute.getFixedValue());
                        env.put("checkResult",checkResult);
                        env.put("fixedValue",fixedValue);
                        String expr = "checkResult " + ruleInfoCompute.getFixedCompareWay() + " fixedValue";
                        Boolean b = (Boolean)AviatorEvaluator.execute(expr, env);
                        env.clear();
                        if(!b){
                            // 如果条件不满足，就报警
                            String msg = "[违反固定值约束]受检分区结果为"+DecimalUtil.getString(checkResult) +
                                    " 不 " + ruleInfoCompute.getFixedCompareWay() + " " + DecimalUtil.getString(fixedValue);
                            alertService.alert(triggerLogId,collectionId,ruleInfoCompute.getId(),msg);
                            ruleInfoCompute.setIfAlert(true);
                        }
                    }
                    if(ruleInfoCompute.getBasePartitionDef()!=null){
                        // 如果设基准分区了，就算一下波动率
                        String lowerStr = ruleInfoCompute.getSwingLower();
                        String upperStr = ruleInfoCompute.getSwingUpper();
                        String checkResultStr = ruleInfoCompute.getCheckResult();
                        String baseResultStr = ruleInfoCompute.getBaseResult();
                        BigDecimal lower = DecimalUtil.toDecimal19d6(lowerStr);
                        BigDecimal upper = DecimalUtil.toDecimal19d6(upperStr);
                        BigDecimal baseResult = DecimalUtil.toDecimal19d6(baseResultStr);
                        BigDecimal checkResult = DecimalUtil.toDecimal19d6(checkResultStr);
                        BigDecimal result = NumberUtil.div(checkResult,baseResult);
                        env.put("result", result);
                        env.put("lower", lower);
                        env.put("upper", upper);
                        Boolean b = (Boolean)AviatorEvaluator.execute(swingExpr, env);
                        env.clear();
                        if (!b) {
                            // 如果条件不满足就报警 将拼字符串的时候需要用到的decimal转成字符串
                            String msg = "[违反波动率约束] 基准分区结果="+DecimalUtil.getString(baseResult) + " 受检分区结果=" + DecimalUtil.getString(checkResult)
                                    + " 受检/基准=" + DecimalUtil.getString(result) + "不在["+DecimalUtil.getString(lower)+","+DecimalUtil.getString(upper)+"]之间";
                            alertService.alert(triggerLogId, collectionId, ruleInfoCompute.getId(), msg);
                            ruleInfoCompute.setIfAlert(true);
                        }
                    }
                });
    }



    public void compute(Map<String, List<RuleInfoComputeWrapper>> map,Long triggerId, Long collectionId) throws HiveSqlException, HiveSqlException {
        for (Map.Entry<String, List<RuleInfoComputeWrapper>> entry : map.entrySet()) {
            Map<String, String> resultMap = hiveService.runSql(entry.getKey());
            List<RuleInfoComputeWrapper> value = entry.getValue();
            if(resultMap != null){
                for (int i = 0; i < resultMap.size(); i++) {
                    RuleInfoComputeWrapper ruleInfoComputeWrapper = value.get(i);
                    if("base".equals(ruleInfoComputeWrapper.getType())){
                        ruleInfoComputeWrapper.getRuleInfoCompute().setBaseResult(String.valueOf(resultMap.get("_c"+i)));
                    }else {
                        ruleInfoComputeWrapper.getRuleInfoCompute().setCheckResult(String.valueOf(resultMap.get("_c"+i)));
                    }
                }
            }else {
                for (RuleInfoComputeWrapper ruleInfoComputeWrapper : value) {
                    if ("base".equals(ruleInfoComputeWrapper.getType())) {
                        ruleInfoComputeWrapper.getRuleInfoCompute().setBaseResult(null);
                    }else {
                        ruleInfoComputeWrapper.getRuleInfoCompute().setCheckResult(null);
                    }
                }
            }
        }
    }

    /**
     * 类型转换一下 ruleInfo -> RuleInfoCompute(在mysql中对应一张表)
     * @param ruleInfoList
     * @return
     */
    public List<RuleInfoCompute> ruleInfo2Compute(Long triggerId,List<RuleInfo> ruleInfoList) {
        return ruleInfoList
                .stream()
                .map(ruleInfo -> new RuleInfoCompute(triggerId,ruleInfo))
                .collect(Collectors.toList());
    }

    /**
     * 将key转为sql
     * @return
     */
    public Map<String,List<RuleInfoComputeWrapper>> keyToSql(String database,String tableName,Map<PartitionResult, List<RuleInfoComputeWrapper>> map) {
        HashMap<String, List<RuleInfoComputeWrapper>> result = new HashMap<>();
        map.forEach(
            (partitionResult, ruleInfoComputeWrappers) -> {
                String sql = new SqlBuilder(database, tableName, partitionResult.toString(), ruleInfoComputeWrappers)
                        .build();
                result.put(sql,ruleInfoComputeWrappers);
            }
        );
        return result;
    }


    /**
     * 按照DSL实际的执行结果进行分组
     * 0. 既执行了分区表达式解析
     * 1. 又对计算结果进行了封装
     */
    public Map<PartitionResult, List<RuleInfoComputeWrapper>> groupByWhere(List<RuleInfoCompute> ruleInfoComputesList ) {
        return ruleInfoComputesList
                .stream()
                .flatMap(
                    // 包装
                    ruleInfoCompute -> {
                        Stream.Builder<RuleInfoComputeWrapper> builder = Stream.builder();
                        if (ruleInfoCompute.getBasePartitionDef() != null) {
                            builder.add(new RuleInfoComputeWrapper("base", ruleInfoCompute));
                        }
                        if (ruleInfoCompute.getCheckPartitionDef() != null) {
                            builder.add(new RuleInfoComputeWrapper("check", ruleInfoCompute));
                        }
                        return builder.build();
                    }
                )
                .collect(
                        // 分组
                        Collectors.groupingBy(
                            ruleInfoComputeWrapper -> {
                                if ("base".equals(ruleInfoComputeWrapper.getType())) {
                                    return partitionDef2Result(ruleInfoComputeWrapper.getRuleInfoCompute().getBasePartitionDef());
                                } else {
                                    return partitionDef2Result(ruleInfoComputeWrapper.getRuleInfoCompute().getCheckPartitionDef());
                                }
                            }
                        )
                );
    }
}
